// Copyright (c) The Libra Core Contributors
// SPDX-License-Identifier: Apache-2.0

//! This file defines state store APIs that are related account state Merkle tree.

#[cfg(test)]
mod state_store_test;

use crate::schema::{
    account_state::AccountStateSchema, retired_state_record::RetiredStateRecordSchema,
    state_merkle_node::StateMerkleNodeSchema,
};
use crypto::{hash::CryptoHash, HashValue};
use failure::prelude::*;
use schemadb::{ReadOptions, SchemaBatch, DB};
use sparse_merkle::{node_type::Node, RetiredRecordType, SparseMerkleTree, TreeReader};
use std::{collections::HashMap, sync::Arc};
use types::{
    account_address::AccountAddress,
    account_state_blob::AccountStateBlob,
    proof::{verify_sparse_merkle_element, SparseMerkleProof},
    transaction::Version,
};

pub(crate) struct StateStore {
    db: Arc<DB>,
}

impl StateStore {
    pub fn new(db: Arc<DB>) -> Self {
        Self { db }
    }

    /// Get the account state blob given account address and root hash of state Merkle tree
    pub fn get_account_state_with_proof_by_state_root(
        &self,
        address: AccountAddress,
        root_hash: HashValue,
    ) -> Result<(Option<AccountStateBlob>, SparseMerkleProof)> {
        let (blob, proof) =
            SparseMerkleTree::new(self).get_with_proof(address.hash(), root_hash)?;
        debug_assert!(
            verify_sparse_merkle_element(root_hash, address.hash(), &blob, &proof).is_ok(),
            "Invalid proof."
        );
        Ok((blob, proof))
    }

    /// Put the results generated by `account_state_sets` to `batch` and return the result root
    /// hashes for each write set.
    pub fn put_account_state_sets(
        &self,
        account_state_sets: Vec<HashMap<AccountAddress, AccountStateBlob>>,
        first_version: Version,
        root_hash: HashValue,
        batch: &mut SchemaBatch,
    ) -> Result<Vec<HashValue>> {
        let blob_sets = account_state_sets
            .into_iter()
            .map(|account_states| {
                account_states
                    .into_iter()
                    .map(|(addr, blob)| (addr.hash(), blob))
                    .collect::<Vec<_>>()
            })
            .collect::<Vec<_>>();

        let (new_root_hash_vec, tree_update_batch) =
            SparseMerkleTree::new(self).put_blob_sets(blob_sets, first_version, root_hash)?;
        let (node_batch, blob_batch, retired_record_batch) = tree_update_batch.into();
        node_batch
            .iter()
            .map(|(node_hash, node)| batch.put::<StateMerkleNodeSchema>(node_hash, node))
            .collect::<Result<Vec<()>>>()?;
        blob_batch
            .iter()
            .map(|(blob_hash, blob)| batch.put::<AccountStateSchema>(blob_hash, blob))
            .collect::<Result<Vec<()>>>()?;
        retired_record_batch
            .iter()
            .map(|row| batch.put::<RetiredStateRecordSchema>(row, &()))
            .collect::<Result<Vec<()>>>()?;
        Ok(new_root_hash_vec)
    }

    /// Purges retired account state blobs and sparse Merkle tree nodes. Yields up to `limit`
    /// deletions to `batch` while keeps account states readable at `least readable version` and
    /// beyond.
    #[allow(dead_code)] // TODO: remove
    pub fn purge_retired_records(
        &self,
        least_readable_version: Version,
        limit: usize,
        batch: &mut SchemaBatch,
    ) -> Result<usize> {
        let mut num_purged = 0;

        let mut iter = self
            .db
            .iter::<RetiredStateRecordSchema>(ReadOptions::default())?;
        iter.seek_to_first();
        let mut iter = iter.take(limit);

        while let Some((record, _)) = iter.next().transpose()? {
            // Only records that have retired before or at version `least_readable_version` can be
            // pruned in order to keep that version still readable after pruning.
            if record.version_retired > least_readable_version {
                break;
            }
            match record.record_type {
                RetiredRecordType::Node => {
                    batch.delete::<StateMerkleNodeSchema>(&record.hash)?;
                }
            }
            batch.delete::<RetiredStateRecordSchema>(&record)?;
            num_purged += 1;
        }

        Ok(num_purged)
    }
}

impl TreeReader for StateStore {
    fn get_node(&self, node_hash: HashValue) -> Result<Node> {
        Ok(self
            .db
            .get::<StateMerkleNodeSchema>(&node_hash)?
            .ok_or_else(|| format_err!("Failed to find node with hash {:?}", node_hash))?)
    }

    fn get_blob(&self, blob_hash: HashValue) -> Result<AccountStateBlob> {
        Ok(self
            .db
            .get::<AccountStateSchema>(&blob_hash)?
            .ok_or_else(|| {
                format_err!(
                    "Failed to find account state blob with hash {:?}",
                    blob_hash
                )
            })?)
    }
}
